The focus of this notebook is on the development of a block-based method for computing distance matrices. The end goal is to produce a method compatible with Fast UniFrac that has reasonable runtime, and dramatically reduced space requirements relative to just computing beta diversity in a single shot. The intuition is that "blocks" of the resulting distance matrix can be computed independently, and that computing a "block" of samples (e.g., 32 at a time) has dramatically lower memory requirements than computing all of the samples at once (e.g., thousands). The reason for this is that Fast UniFrac produces an internal data structure called the counts_array
which requires on the order of O(n * m * log(m))
space, where n
is the number of samples and m
is the number of OTUs. By only computing a subset of samples at a time, both n
and m
are reduced with the former being a reducing simply by the number of samples being evaluated and the latter on the general case where the examination of fewer samples necessitates representing fewer OTUs in the tree.
There still exist optimizations to be performed. The block method is currently setup so that it can be parallelized (as each block is independent), but at the moment, the computation is performed serially. In addition, the block method results in duplicated compute with some positions in the distance matrix being computed ceil(n / k)
times where n
is the number of samples and k
is the block size. However, the compute of an individual pairwise calculation is small relative to the expense of creating a counts_array
. Note, this suggests using a large k
but it has been observed that a large k
can drive runtime up possibly due to needing a larger number of OTUs represented within a block, and thus increasing the expense in space and time of the construction of the counts_array
.
During development, it became apparent that the TreeNode
is very large to represent in memory, and Fast UniFrac doesn't actually need to operate on it directly. One of the bottlenecks identified was the TreeNode.shear
method which is used to take a subset of the tree based on the OTUs represented by the samples within a block. More discussion on this new shear
method can be found here.
In [31]:
from time import time
from random import shuffle
from collections import Counter
import os
import numpy as np
import psutil # pip install psutil
from skbio import read, TreeNode, DistanceMatrix
from skbio.diversity import beta_diversity
from memory_profiler import memory_usage # pip install memory_profiler
from biom import load_table
process = psutil.Process(os.getpid())
In [66]:
from ipyparallel import Client
import sys
from functools import partial
client = Client()
dv = client[:]
bv = client.load_balanced_view()
with dv.sync_imports():
import numpy as np
from skbio.diversity import beta_diversity
class MockTreeNode(object):
def __init__(self, original_tree_array):
self.original_tree_array = original_tree_array
def to_array(self, nan_length_value=0.0):
import numpy as np
self.original_tree_array['length'][np.isnan(self.original_tree_array['length'])] = nan_length_value
return self.original_tree_array
def shear(self, to_keep):
return MockTreeNode(shear(self.original_tree_array, to_keep))
dv['MockTreeNode'] = MockTreeNode
def shear(indexed, to_keep):
"""Shear off nodes from a tree array
Parameters
----------
indexed : dict
The result of TreeNode.to_array
to_keep : set
The tip IDs of the tree to keep
Returns
-------
dict
A TreeNode.to_array like dict with the exception that "id_index" is not
provided, and any extraneous attributes formerly included are not
passed on.
Notes
-----
Unlike TreeNode.shear, this method does not prune (i.e., collapse single
descendent nodes). This is an open development target.
This method assumes that to_keep is a subset of names in the tree.
The order of the nodes remains unchanged.
"""
import numpy as np
# nodes to keep mask
mask = np.zeros(len(indexed['id']), dtype=np.bool)
# set any tips marked "to_keep"
tips_to_keep = [i for i, n in enumerate(indexed['name']) if n in to_keep]
mask[np.asarray(tips_to_keep)] = True
# perform a post-order traversal and identify any nodes that should be
# retained
new_child_index = []
for node_idx, child_left, child_right in indexed['child_index']:
being_kept = mask[child_left:child_right + 1]
# NOTE: the second clause is an explicit test to keep the root node. This
# may not be necessary and may be a remenant of mucking around.
if being_kept.sum() >= 1 or node_idx == indexed['id'][-1]:
mask[node_idx] = True
# we now know what nodes to keep, so we can create new IDs for assignment
new_ids = np.arange(mask.sum(), dtype=int)
# construct a map that associates old node IDs to the new IDs
id_map = {i_old: i_new for i_old, i_new in zip(indexed['id'][mask], new_ids)}
# perform another post-order traversal to construct the new child index arrays
# which provide index positions of the desecendents of a given internal node.
for node_idx, child_left, child_right in indexed['child_index']:
being_kept = mask[child_left:child_right + 1]
# NOTE: the second clause is an explicit test to keep the root node. This
# may not be necessary and may be a remenant of mucking around.
if being_kept.sum() >= 1 or node_idx == indexed['id'][-1]:
new_id = id_map[node_idx]
child_indices = indexed['id'][child_left:child_right + 1][being_kept]
left_child = id_map[child_indices[0]]
right_child = id_map[child_indices[-1]]
new_child_index.append([new_id, left_child, right_child])
new_child_index = np.asarray(new_child_index)
return {'child_index': new_child_index,
'length': indexed['length'][mask],
'name': indexed['name'][mask],
'id': new_ids}
dv['shear'] = shear
def _block_execute(metric, tree, table, ids):
row_ids, col_ids = ids
ids_to_keep = row_ids.union(col_ids)
block = table.filter(ids_to_keep, inplace=False)
block.filter(lambda v, i, md: v.sum() > 0, axis='observation')
block_ids = block.ids()
block_otu_ids = block.ids(axis='observation')
block_tree = tree.shear(block_otu_ids)
block_matrix = block.matrix_data.astype(int).T.toarray()
block_dmat = beta_diversity(metric, block_matrix, block_ids,
tree=block_tree, otu_ids=block_otu_ids,
validate=False)
return (row_ids, col_ids, block_dmat)
def _blockinator(ids, block_size):
"""Get blocks of IDs"""
for row_start in range(0, len(ids), block_size):
for col_start in range(row_start, len(ids), block_size):
row_ids = set(ids[row_start:row_start + block_size])
col_ids = set(ids[col_start:col_start + block_size])
yield (row_ids, col_ids)
def block_dist(tree, table, metric, block_size=64):
"""Perform a block-based computation of a distance matrix
Parameters
----------
tree : TreeNode-like object
A Tree
table : biom
A biom table of the samples and observations
metric : str, one of {unweighted_unifrac, weighted_unifrac}
The method to use
block_size : int
The size of the block in the resulting distance matrix to
compute at a time.
Returns
-------
DistanceMatrix
The computed distance matrix
"""
global dv
global bv
table._sample_metadata = None
table._observation_metadata = None
_block_executer = partial(_block_execute, 'unweighted_unifrac', tree, table)
dv['_block_execute'] = _block_execute
dv['_block_executer'] = _block_executer
ids = table.ids()
dmat = np.zeros((len(ids), len(ids)), dtype=float)
dmat_index = {i: idx for i, idx in zip(ids, range(len(ids)))}
reduce_total = 0
for row_ids, col_ids, block_dmat in bv.map(_block_executer, list(_blockinator(ids, block_size)), chunksize=1):
reduce_time = time()
for i in block_dmat.ids:
i_idx = block_dmat.index(i)
i_dmat_index = dmat_index[i]
for j in block_dmat.ids[i_idx:]:
j_idx = block_dmat.index(j)
dmat[i_dmat_index, dmat_index[j]] = block_dmat.data[i_idx, j_idx]
reduce_total += (time() - reduce_time)
print(reduce_total)
return DistanceMatrix(dmat + dmat.T, ids=ids)
In [62]:
def bench(tree, table, number_otus, number_samples, block_size, metric):
"""Benchmark the block and regular beta diversity methods
Parameters
----------
tree : path
File path to the tree to load
table : path
File path to the table to load
number_otus : int
Number of OTUs to use in the test
number_samples : int
Number of samples to use in the test
block_size : int
The blocksize to use for the test
metric : str, {unweighted_unifrac, weighted_unifrac}
"""
# the time to read the tree and BIOM table
start = time()
tree = read(tree, into=TreeNode)
table = load_table(table)
for node in tree.traverse(include_self=False):
if node.length is None:
node.length = 0.0
# aggressively clean up leaky variables so the original tree can be freed
del node
# get sample subset
sample_ids = table.ids()
# samples must have at least 1000 sequences
sample_ids = [i for i, v in zip(sample_ids, table.sum(axis='sample')) if v >= 1000]
shuffle(sample_ids)
sample_ids = sample_ids[:number_samples]
table.filter(sample_ids)
# observations must exist in at least .1% of samples
table.filter(lambda v, i, md: ((v != 0).sum() / len(sample_ids)) >= 0.001, axis='observation')
# get otu subset of the tree
table_obs_ids = table.ids(axis='observation')
table_obs_idx_lookup = {i: idx for idx, i in enumerate(table_obs_ids)}
otu_ids = [n.name for n in tree.tips()]
otu_ids = list(set(otu_ids).intersection(set(table_obs_ids))) # make sure OTUs overlap with table
shuffle(otu_ids)
otu_ids = otu_ids[:number_otus]
# construct a MockTreeNode using skbio's TreeNode.shear method
tree_array = MockTreeNode(tree.shear(otu_ids).to_array())
# delete unnecessary references to the tree, drop the tree and request a cleanup
del tree_array.original_tree_array['id_index']
del tree
import gc; gc.collect()
# remove excess OTUs from the table
table.filter(otu_ids, axis='observation')
# remove samples without any OTUs (hopefully a small number...)
table.filter(lambda v, i, md: v.sum() > 0)
print("# spinuptime: %f" % (time() - start))
print("# spinupmem: %f" % (process.memory_info().rss / 2**20))
# if we dropped more than 10% of desired samples do to filtering about, let's bail
if ((number_samples - len(table.ids())) / float(number_samples)) > 0.1:
print(number_samples)
print(len(table.ids()))
raise ValueError("Dropped too many samples!")
# run the block method
args = (tree_array, table, metric, block_size)
block_start = time()
(block_usage, block_result) = memory_usage((block_dist, args),
interval=2, max_usage=True,
retval=True)
block_time = time() - block_start
print("#number_otus\tnumber_samples\tblocksize\truntime\tpeakmem\tmethod\n")
print('\t'.join([str(i) for i in [number_otus, number_samples,
block_size, block_time,
block_usage[-1], 'block']]))
# run the normal method
test_matrix = table.matrix_data.astype(int).T.toarray()
args = (metric, test_matrix, table.ids())
kwargs = {'tree': tree_array, 'otu_ids':table.ids(axis='observation'),
'validate': False}
normal_start = time()
(usage, result) = memory_usage((beta_diversity, args, kwargs),
interval=2, max_usage=True, retval=True)
normal_time = time() - normal_start
print('\t'.join([str(i) for i in [number_otus, number_samples,
'NA', normal_time, usage[-1],
'regular']]))
if not np.allclose(block_result.data, result.data):
print(block_result.ids == result.ids)
print(block_result.data[:5, :5])
print(result.data[:5, :5])
raise ValueError
In [67]:
gg_tree = '/Users/daniel/miniconda3/envs/qiime191/lib/python2.7/site-packages/qiime_default_reference/gg_13_8_otus/trees/97_otus.tree'
ag_table = '/Users/daniel/rs/American-Gut/data/AG/AG_even1k.biom'
number_otus = 10000
number_samples = 1000
block_size = 256
metric = 'unweighted_unifrac'
bench(gg_tree, ag_table, number_otus, number_samples, block_size, metric)
In [51]:
# 10000 otus, 1000 samples, 128 blocksize on the AG data
# 118 v 65 on 1 core
# 60 v 65 on 2 core
# 39 v 65 on 3 core
# 35 v 65 on 4 core
# each thread peaks around 150MB
In [ ]:
# 10000 otus, 1000 samples, 64 blocksize on the AG data
# 64 v 65 on 3 core
# 45 v 62 on 4 core
# each thread peaks around 125MB
In [ ]:
# 10000 otus, 1000 samples, 256 blocksize on the AG data
# 30 v 62 on 4 core
# 54 v 63 on 2 core
# 10000 otus, 1000 samples, 512 blocksize on the AG data
# 62 v 62 on 2 core
# 62 v 62 on 4 core